package org.springboot.kafka.controller;

import javax.annotation.Resource;

import org.springboot.kafka.config.KafkaOriginalConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/kafka")
public class KafkaController {
	
	@Value("${spring.kafka.one.topic}")
    private String topicOne;
	@Value("${spring.kafka.two.topic}")
    private String topicTwo;
	@Value("${spring.kafka.three.topic}")
    private String topicThree;
	
	@Autowired
	private KafkaOriginalConfig kafkaOriginalConfig;
	
	@Autowired
	@Qualifier("kafkaOneTemplate")
	private KafkaTemplate<String, Object> kafkaOneTemplate;
	
	@Resource(name = "kafkaTwoTemplate")
    private KafkaTemplate<String, Object> kafkaTwoTemplate;
	
	/**
	 * 基本概念：
	 * Producer: 生产者，消息的产生者，是消息的入口
	 * Consumer: 消费者，即消息的消费方，是消息的出口
	 * Consumer Group(消费者组):
	 *  - 每个消费者组都有一个组 id
	 *  - 同一个消费组者的消费者可以消费同一 topic 下不同分区的数据
	 *  - 组内多个消费者不会消费同一分区的数据
	 *  - 当 consumer 数量 > partition 数量，多出来的消费者不消费任何 partition 的数据。实际应用建议 consumer 的数量与 partition 的数量一致
	 * Zookeeper: kafka 集群依赖 zookeeper 来保存集群的的元信息，来保证系统的可用性
	 * kafka cluster:
	 *  - Broker: kafka 实例，每个服务器上有一个或多个 kafka 的实例，我们姑且认为每个 broker 对应一台服务器。每个 kafka 集群内的 broker 都有一个不重复的编号
	 *  - Topic: 消息的主题，可以理解为消息的分类，kafka 的数据就保存在 topic。在每个 broker 上都可以创建多个 topic，自动创建的 topic 分区数是 1，复制因子是 0
	 *  - Partition: Topic 的分区，每个 topic 可以有多个分区，分区的作用是做负载，提高 kafka 的吞吐量。同一个 topic 在不同的分区的数据是不重复的，partition 的表现形式就是一个一个的文件夹
	 *  - Replication: 每一个分区都有多个副本，副本的作用是做备胎。当主分区（Leader）故障的时候会选择一个备胎（Follower）上位，成为 Leader。在 kafka 中默认副本的最大数量是 10 个，
	 *  且副本的数量不能大于 Broker 的数量，follower 和 leader 绝对是在不同的机器，同一机器对同一个分区也只可能存放一个副本（包括自己）
	 *  - Message: 每一条发送的消息主体
	 *  
	 * 性质：
	 * 1. 注册 kafka 消费者，设置偏移量时，从设置位置向后消费；未设置偏移量时，默认从当前分组的上次 commit 位置向后消费
	 * 2. 通过设置 auto.offset.reset 参数更改消费模式:
	 *  - earliest: 当各分区下有已提交的 offset 时，从提交的 offset 开始消费；无提交的 offset 时，从头开始消费
	 *  - latest: 当各分区下有已提交的 offset 时，从提交的 offset 开始消费；无提交的 offset 时，消费新产生的该分区下的数据
	 *  - none: topic 各分区都存在已提交的 offset 时，从 offset 后开始消费；只要有一个分区不存在已提交的 offset，则抛出异常
	 */
	
	/**
	 *  消息发送流程：
	 *  1. producer 从集群获取分区的 leader
	 *  2. producer 将消息发给 leader (producer 采用 push 模式将数据发布到 broker，每条消息追加到分区中，顺序写入磁盘，所以保证同一分区内的数据是有序的)
	 *  3. leader 将消息写入本地文件
	 *  4. followers 主动从 leader pull 同步消息
	 *  5. followers 将消息写入本地后向 leader 发送 ACK
	 *  6. leader 收到所有副本的 ACK 后向 producer 发送 ACK
	 *  
	 *  分区存储：
	 *  1. 方便扩展: 因为一个 topic 可以有多个 partition，所以我们可以通过扩展机器去轻松的应对日益增长的数据量
	 *  2. 提高并发: 以 partition 为读写单位，可以多个消费者同时消费数据，提高了消息的处理效率
	 *  
	 *  ACK 应答机制：
	 *  1. 保证消息不丢失。在生产者向队列写入数据的时候可以设置参数来确定是否确认 kafka 接收到数据，这个参数可设置的值为 0、1、all
	 *  2. 0 代表 producer 往集群发送数据不需要等到集群的返回，不确保消息发送成功。安全性最低但是效率最高
	 *  3. 1 代表 producer 往集群发送数据只要 leader 应答就可以发送下一条，只确保 leader 发送成功
	 *  4. all 代表 producer 往集群发送数据需要所有的 follower 都完成从 leader 的同步才会发送下一条，确保 leader 发送成功和所有的副本都完成备份。安全性最高，但是效率最低
	 */
	
	/**
	 * kafka 数据保存：
	 * 1. Kafka 初始会单独开辟一块磁盘空间，顺序写入数据（效率比随机写入高）
	 * 2. Partition 结构
	 *  - Partition 在服务器上的表现形式就是一个一个的文件夹
	 *  - 每个 partition 的文件夹下面会有多组 segment 文件
	 *  - 每组 segment 文件又包含.index 文件、.log 文件、.timeindex 文件（早期版本中没有）三个文件
	 *  - log 文件就实际是存储 message 的地方，index 索引文件存储 log 中的消息偏移量和偏移字节数，timeindex 时间戳索引文件，用于检索消息
	 *  - 利用分段 + 索引的方式来解决查找效率的问题
	 * 3. Message 结构
	 *  - log 文件存储的 message 包含消息体、消息大小、offset、压缩类型等
	 *  - offset：是一个占 8byte 的有序 id 号，它可以唯一确定每条消息在 parition 内的位置
	 *  - 消息大小：占用 4byte，用于描述消息的大小
	 *  - 消息体：存放的是实际的消息数据（被压缩过），占用的空间根据具体的消息而不一样
	 * 4. 存储策略
	 *  - 基于时间，默认配置是 168 小时（7天）
	 *  - 基于大小，默认配置是 1073741824
	 *  - 注意：kafka 读取特定消息的时间复杂度是 O(1)，所以这里删除过期的文件并不会提高 kafka 的性能
	 */
	
	/**
	 * 消息消费过程：(偏移量举例，参考图示)
	 * 1. 消费时找 leader 去拉取消息
	 * 2. 根据 offset 定位到具体的 segment 日志文件：由于 log 日志文件的文件名是这个文件中第一条消息的 offset-1，找到 message 所在的 segment 文件（利用二分法查找）
	 * 3. 计算查找的 offset 在日志文件的相对偏移量
	 *  - 第一步，假设 offset 为 x，需要定位的 offset - segment 文件中第一条消息的 offset + 1，得到索引偏移量(indexOffer)
	 *  - 第二步，打开找到的 segment 中的.index 文件，利用二分法查找相对 indexOffset 小于或者等于指定的相对 indexOffset 的索引条目中最大的那个相对 indexOffset(稀疏索引)
	 *  - 第三步，根据 indexOffer 可以定位到该消息在日志文件中的偏移字节数(byteOffer)，直接读取文件夹.log，从位置为 byteOffer 的那个地方开始顺序扫描直到找到 offset 为 x 的那条 Message 
	 * 4. 注意：这套机制是建立在 offset 为有序的基础上，利用 segment + 有序 offset + 稀疏索引 + 二分查找 + 顺序查找等多种手段来高效的查找数据
	 * 
	 * 消费结束提交：
	 * 1. __consumer_offsets 是 kafka 自行创建的，和普通的 topic 相同。它存在的目的之一就是保存 consumer 提交的 offset
	 * 2. kafka 默认为该 topic 创建了 50 个分区，并且对每个 group.id 做哈希求模运算，从而将负载分散到不同的 __consumer_offsets 分区上
	 * 3. 保存内容可以当成一个 KV 格式的消息，key 就是一个三元组：group.id + topic + 分区号，而 value 就是 offset 的值
	 */
	
	@RequestMapping("/send")
    @ResponseBody
    public String send() {
		kafkaOneTemplate.send(topicOne, "kafka 消息 one");
		kafkaTwoTemplate.send(topicTwo, "kafka 消息 two");
		kafkaOriginalConfig.send(topicThree, "kafka 消息 three");
        return "success";
    }
}
